Black-Box Optimiztion of Traffic Light ControlΒΆ

Alleviating traffic congestion in urban areas is essential not only from an economic perspective but also from the perspective of environmental preservation. Especially in areas with high-rise condominiums and large shopping malls, where many traffic lines concentrate, local traffic congestion can propagate to a wide area, so congestion mitigation measures must be comprehensive, considering the entire region.

This example program uses black-box optimization with Fixstars Amplify to optimally control traffic signals to maximize the average speed of all vehicles traveling in the city. Although we evaluate the objective function (average vehicle speed) using a multi-agent simulation (MAS) in this example, we can perform the optimization in the same way if the objective function were the average vehicle speed observed in actual traffic conditions or the number of vehicles entering a particular intersection per unit of time.

For basic knowledge of black-box optimization with machine learning and quantum annealing/Ising machines used here, see "Black-Box Optimization with Quantum Annealing and Ising Machines". Also, as other applied model cases utilizing this black-box optimization method, see:

  • Black-Box Optimization of Operating Condition in a Chemical Reactor
  • Black-Box Optimization of Airfoil Geometry by Fluid Flow Simulation

Also, please refer to QUBO-based traffic light optimization for a similar approach to the signal control treated in this sample program, where we consider the signal control as a combinatorial optimization problem based on the QUBO formulation.

This sample program (optimal signal control by black-box optimization) consists of the following:

  • 1. MAS for urban congestion
    • 1.1. What is MAS?
    • 1.2. Traffic congestion model
    • 1.3. MAS traffic simulator
  • 2. FMQA program implementation (integer input values)
    • 2.1. Random seed initialization
    • 2.2. Configuration of Amplify client
    • 2.3. Implementing FM with PyTorch
    • 2.4. Construction of initial training data
    • 2.5. Execution class for FMQA cycle (non-binary decision variables)
    • 2.6. One-hot encoder/decoder implementation
  • 3. Optimization of traffic light control
    • 3.1. FMQA execution example
    • 3.2. Transition of objective function values during the optimization process
    • 3.3. MAS traffic simulation with optimized traffic light settings
    • 3.4. Example output from this FMQA sample program
  • 4. Summary and practical guidelines

*In this online demo and tutorial environment, the continuous execution time is limited to about 20 minutes. If you expect the execution time to exceed 20 minutes, for example, when trying optimization by changing conditions, please copy this sample program to your local environment before execution. In that case, download the following libraries related to the MAS traffic simulator as appropriate and save them in the following directory structure.

β”œ fmqa_4_traffic.ipynb (this program)
β”” utils/
Β Β Β Β Β β”œ __init__.py (blank file)
Β Β Β Β Β β”” traffic_model.py

1. MAS for urban congestionΒΆ

1.1. What is MAS?ΒΆ

Multi-agent simulation (MAS) is a simulation technique that mimics the behavior of multiple agents interacting with each other and may be employed to reproduce real-world complex systems and phenomena. In a simulation, each agent makes individual decisions and behaves in response to its surroundings. Complex interactions characterize the result as a whole.

MAS is used in a variety of fields. Examples include the following areas:

  • Social systems
    MAS is used to study human behavior and social interactions and to understand the impact of policies and social systems. We can use MAS in economics, sociology, and political science.

  • Transportation systems
    Simulates the movement of vehicles and pedestrians within a transportation network to study measures to optimize traffic flow and reduce congestion.

  • Natural Disasters
    Natural disasters such as earthquakes, tsunamis, and volcanic eruptions are simulated and used to assess damage and study disaster prevention measures.

  • Organizational Dynamics
    Mimics the interactions of individuals and departments within a company or organization to analyze the impact of organizational efficiency and decision-making.

In this section, we explain MAS related to automobile traffic in cities and will perform actual traffic simulations.

1.2. Traffic congestion modelΒΆ

Each car must make individual decisions when simulating urban traffic (by cars) with MAS. In other words, the control is not from the viewpoint of an observer who can see the entire simulation but from the perspective of each car, i.e., the decision is based on the distance from the car in front (and, if necessary, behind). The key here is the following model, called the optimal speed model (Bando et al., Jpn. J. Ind. Appl. Math. (1994); Bando et al., Phys. Rev. E ( 1995)).

$$ \ddot x_i = a \{ V(\Delta x_i) - \dot x_i \} $$

$$ \Delta x_i = x_{i+1} - x_i $$

$$ v_i = \dot x_i $$

Here, $x_i$ and $v_i$ are the position and speed of the $i$-th car (you), and $i+1$ is the index of the vehicle in front of you. Also, $V(\Delta x_i)$ denotes the optimal target speed at the current distance $\Delta x_i$ from the car in front of you. Finally, $a$ is the sensitivity, a parameter corresponding to the speed of the driver's reflexes. This parameter models the acceleration and deceleration by the driver to match the optimal rate determined from the distance between vehicles. Here, this example code uses the following model as the optimal speed.

$$ V(\Delta x_i)=\tanh(\Delta x_i-2) + \tanh(2) $$

In this simulation, each car (driver) implements speed control based on these model equations and drives autonomously to the destination while observing signals along the route. Therefore, we expect that this simulation naturally reproduces traffic congestion caused directly or indirectly by the presence of large commercial facilities, etc.

1.3. MAS traffic simulatorΒΆ

We will now introduce the MAS-based traffic simulator TrafficSimulator2Malls that we will use here. The city considered in this simulator is a city with a simple shape consisting of city_size$\times$city_size blocks, with roads in a grid pattern between the blocks. All intersections have traffic lights, meaning we can control traffic signals installed at city_size$\times$city_size intersections. The city has two popular shopping malls, and we likely see heavy traffic locally around these malls. To simplify the problem, the boundaries surrounding the city are periodic (e.g., what goes out of the city from the left comes in from the right).

Our sample code considers three control parameters for traffic signals at each intersection: [red light length, green light length, phase] (units are all in seconds). For example, if you use [15.0, 12.0, 5.0] as control parameters for the traffic light at an intersection $i$, then **5.0 seconds after the simulation start time, the north-south signal at that intersection will turn red for the next 15.0 seconds, then green for 12.0 seconds, then red for 15.0 seconds, then green for 12.0 seconds, then red for 15.0 seconds, ... ** and so on.

Here, the input parameters to the traffic lights are real numbers. From the applicability of the black-box optimization standpoint, we use integer indices corresponding to the real numbers as the simulation input. To understand the integer index, consider a real-number parameter that can take values between 0.0 and 1.0, represented by six integer indices. The correspondence table between each integer index and the parameter value would be as follows.

Integer index value 0 1 2 3 4 5
Parameter value 0.0 0.2 0.4 0.6 0.8 1.0

The following defines the correspondence between integer indices and real-number parameters in the parameter generation class for signal control SignalController. Here, in the default configuration, the values taken by the three parameters for each traffic light are all between 1 and 20 seconds, which we represent by 20 integer indices. Running the following code cell will display the corresponding table of integer indices considered and the corresponding parameter values. For example, with the default settings, the parameter value corresponding to an integer index of 10 is 11.0 seconds.

InΒ [Β ]:
%matplotlib widget
from utils.traffic_model import SignalController

# Specifies the size of the grid city. Cities are represented by a grid map of city_size*city_size
city_size = 3

# Instantiate the class to generate signal control parameters for a given city size
signal_controller = SignalController(city_size)

# Display the table of correspondence between integer indices and parameter values
signal_controller.show_index_table()

Now that we have instantiated the class that maps integer indices to real-number parameters, we can use this class to create traffic light control parameters (real numbers). In the following, we generate integer indices randomly using integer random numbers from 0 to 19. The number of integer indices we need to generate is the [number of parameters for each traffic light] $\times$ [number of intersections] = 3 $\times$ city_size $\times$ city_size.

Next, the generated integer indices are converted (decoded) to the corresponding real-number parameter values. We perform this conversion using the decode() method of the SignalController class. The argument disp_flag=True of the decode() method can display the parameter value after decoding. Let's check what values we have had as randomly chosen signal control parameters.

InΒ [Β ]:
import numpy as np

# Generate [3 * city_size * city_size] integer indices ranging from 0 to 19 using integer random numbers
index_list = np.random.randint(0, 20, 3 * city_size * city_size)

# Convert the integer indices to corresponding parameter values (decode)
signal_info = signal_controller.decode(index_list=index_list, disp_flag=True)

Finally, we perform a traffic simulation using the MAS traffic simulator class TrafficSimulator2Malls, where traffic signals are controlled based on the generated control parameter signal_info. When instantiating TrafficSimulator2Malls, we set the upper limit of the number of cars traveling in a grid city to 20 (num_cars=20), with 50% of the vehicles having a destination in one of the two shopping malls (ratio_mall=0.5). The origin and destination of each car considered in the traffic simulation are determined using random numbers, and the fourth argument, seed, corresponds to the seed value of the random numbers.

Here, during the simulation, all the cars go back and forth between (1) their "home," which is set by a random number, and (2) a randomly set location or a shopping mall. After arriving at the destination, they remain at that location for a certain period and are hidden from the road until the return trip begins. The cars displayed are colored to correspond to the route and make a round trip along the following path:

  • Gray $\rightarrow$ Round trip between home and Shopping Mall 1
  • Black $\rightarrow$ Round Trip between home and Shopping Mall 2
  • White $\rightarrow$ Round trip between home and other location

The current destination of each car on the road is indicated by a corresponding colored circle, except when the destination is a shopping mall. Observe how each vehicle drives autonomously to its destination while obeying the traffic signals.

Also, although not an essential part of this optimization, all simulation results are displayed in SI units (m, s, m/s).

InΒ [Β ]:
from utils.traffic_model import TrafficSimulator2Malls

# Instantiate simulator
traffic_sim = TrafficSimulator2Malls(
    signal_info=signal_info, num_cars=20, ratio_mall=0.5, seed=0
)

# Run the simulation (display result in animation)
traffic_sim.run(steps=500, animation=True)

# Run simulation (output gif animation)
# *About 1 minute of processing time with the following argument settings.
# *Please try gif output in local environment.
# traffic_sim.run(steps=200, gif="./anim.gif")

2. FMQA program implementation (integer input values)ΒΆ

This section describes the program implementation of FMQA, much of which is the same as in "Black-Box Optimization with Quantum Annealing/Ising Machines". Please refer to that document for details.

Please note that the only difference is that the present black-box optimization considers integer vectors as input values to the objective function instead of binary variables. For the handling of integer input values in FMQA, see "One-hot encoder/decoder implementation".

2.1. Random seed initializationΒΆ

Define a function seed_everything to initialize random seed values to ensure that the initial training data and machine learning results do not change with each run.

InΒ [Β ]:
import os
import torch


def seed_everything(seed=0):
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

2.2. Configuration of Amplify clientΒΆ

Here, we create an Amplify client and set the necessary parameters. In the following, we set the timeout for a single search by the Ising machine to 5 seconds.

InΒ [Β ]:
from amplify import FixstarsClient
from datetime import timedelta

client = FixstarsClient()
client.parameters.timeout = timedelta(milliseconds=5000)  # 5000 msec
# client.token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"  # If you use Amplify in a local environment or Google Colaboratory, enter your Amplify API token.

2.3. Implementing FM with PyTorchΒΆ

This example code implements FM with PyTorch. In the TorchFM class, we define the acquisition function $g(x)$ as a machine learning model. Each term in $g(x)$ directly corresponds to out_lin, out_1, out_2, and out_inter in the TorchFM class, as in the following equation.

$$ \begin{aligned} g(\boldsymbol{x} | \boldsymbol{w}, \boldsymbol{v}) &= \underset{\color{red}{\mathtt{out\_lin}}}{\underline{ w_0 + \sum_{i=1}^n w_i x_i} } + \underset{\color{red}{\mathtt{out\_inter}}}{\underline{\frac{1}{2} \left[\underset{\color{red}{\mathtt{out\_1}}}{\underline{ \sum_{f=1}^k\left(\sum_{i=1}^n v_{i f} x_i\right)^2 }} - \underset{\color{red}{\mathtt{out\_2}}}{\underline{ \sum_{f=1}^k\sum_{i=1}^n v_{i f}^2 x_i^2 }} \right] }} \end{aligned} $$

InΒ [Β ]:
import torch.nn as nn


class TorchFM(nn.Module):
    def __init__(self, d: int, k: int) -> None:
        super().__init__()
        self.V = nn.Parameter(torch.randn(d, k), requires_grad=True)
        self.lin = nn.Linear(d, 1)

    def forward(self, x):
        out_1 = torch.matmul(x, self.V).pow(2).sum(1, keepdim=True)
        out_2 = torch.matmul(x.pow(2), self.V.pow(2)).sum(1, keepdim=True)
        out_inter = 0.5 * (out_1 - out_2)
        out_lin = self.lin(x)
        out = out_inter + out_lin
        return out

Next, a function train() is defined to train the FM based on the training data sets. As in general machine learning methods, this function divides the data sets into training and validation data, optimizes the FM parameters using the training data, and validates the model during training using the validation data. The train() function returns the model with the highest prediction accuracy for the validation data.

InΒ [Β ]:
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from typing import Type

import copy


def train(
    X: np.ndarray,
    y: np.ndarray,
    model_class: Type[nn.Module],
    model_params: dict[str, int | float],
    batch_size=1024,
    epochs=3000,
    criterion=nn.MSELoss(),
    optimizer_class=torch.optim.AdamW,
    opt_params={"lr": 1},
    lr_sche_class=None,
    lr_sche_params=None,
):
    X_tensor, y_tensor = (
        torch.from_numpy(X).float(),
        torch.from_numpy(y).float(),
    )
    indices = np.array(range(X.shape[0]))
    indices_train, indices_valid = train_test_split(
        indices, test_size=0.2, random_state=42
    )

    train_set = TensorDataset(X_tensor[indices_train], y_tensor[indices_train])
    valid_set = TensorDataset(X_tensor[indices_valid], y_tensor[indices_valid])
    loaders = {
        "train": DataLoader(train_set, batch_size=batch_size, shuffle=True),
        "valid": DataLoader(valid_set, batch_size=batch_size, shuffle=False),
    }

    model = model_class(**model_params)
    best_model_wts = copy.deepcopy(model.state_dict())
    optimizer = optimizer_class(model.parameters(), **opt_params)
    scheduler = None
    if lr_sche_class is not None:
        scheduler = lr_sche_class(optimizer, **lr_sche_params)
    best_score = 1e18
    for _ in range(epochs):
        losses = {"train": 0.0, "valid": 0.0}

        for phase in ["train", "valid"]:
            if phase == "train":
                model.train()
            else:
                model.eval()

            for batch_x, batch_y in loaders[phase]:
                optimizer.zero_grad()
                out = model(batch_x).T[0]
                loss = criterion(out, batch_y)
                losses[phase] += loss.item() * batch_x.size(0)

                with torch.set_grad_enabled(phase == "train"):
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

            losses[phase] /= len(loaders[phase].dataset)

        with torch.no_grad():
            model.eval()
            if best_score > losses["valid"]:
                best_model_wts = copy.deepcopy(model.state_dict())
                best_score = losses["valid"]
        if scheduler is not None:
            scheduler.step()

    with torch.no_grad():
        model.load_state_dict(best_model_wts)
        model.eval()

    out = model(X_tensor).T[0]
    coef = torch.corrcoef(torch.stack((out, y_tensor)))[0, 1].detach()
    return model, coef.item()

2.4. Construction of initial training dataΒΆ

The gen_training_data function evaluates the objective function $f(\boldsymbol{x})$ against the input value $\boldsymbol{x}$ to produce $N_0$​​​ input-output pairs (initial training data). We can determine the input value $\boldsymbol{x}$ in various ways, such as using a random number or a value suitable for machine learning based on prior knowledge. You can also build up the training data from the results of previous experiments or simulations.

To handle non-binary integer input values, we use the input_generator function, which generates an integer input vector with random numbers and converts it to a binary vector based on one-hot encoding. The function input_generator is defined in the one-hot encoder/decoder class EncDecOneHot, which we will introduce later.

InΒ [Β ]:
def gen_training_data_flow_onehot(D: int, N0: int, true_func, input_generator):
    X = np.zeros((N0, D))
    y = np.zeros(N0)
    print("")
    print("###################################################")
    print(" Making initial training data")
    print("###################################################")
    for i in range(N0):
        print(f"Generating training data set #{i}.")
        # Randomly determine input vector consisting of (0 or 1)
        x = input_generator()
        # If the same input vector has already exist, regenerate the input values
        is_identical = True
        while is_identical:
            is_identical = False
            for j in range(i + 1):
                if np.all(x == X[j, :]):
                    x = input_generator()
                    is_identical = True
                    break
        # Evaluate input value x with objective function f(x) and copy input-output pair (x,f(x)) to training data
        X[i, :] = x
        y[i] = true_func(x)
        print(f"------------------------")
    return X, y

2.5. Execution class for FMQA cycleΒΆ

FMQA.cycle() executes an FMQA cycle run for $Nβˆ’N_0$​​​ times using the pre-prepared initial training data. FMQA.step() is a function that performs only one FMQA cycle and is called $Nβˆ’N_0$​​​ times by FMQA.cycle().

For FMQA with non-binary variables, we need to encode each real/integer variable as a set of binary variables. The FMQA_NB class assumes one-hot encoding. The encoder and decoder for the non-binary variables are specified in this class as encoder_decoder.encoder and encoder_decoder.decoder, respectively. Also, nbins_array is a list of the total (variable) number of bins used to encode each non-binary variable.

InΒ [Β ]:
from amplify import VariableGenerator, solve, one_hot, sum
import matplotlib.pyplot as plt
import sys
import time

# FMQA for non-binary inputs


class FMQA_NB:
    def __init__(self, D, N, N0, k, true_func, client, encoder_decoder) -> None:
        assert N0 < N
        self.D = D
        self.N = N
        self.N0 = N0
        self.k = k
        self.true_func = true_func
        self.client = client
        self.nbins_array = encoder_decoder.nbins_array
        self.encoder = encoder_decoder.encoder
        self.decoder = encoder_decoder.decoder
        self.y = None

    # A member function that repeatedly performs (N-N0)x FMQA based on the training data with adding new training data
    def cycle(self, X, y, log: bool = False) -> np.ndarray:
        # Weight for one-hot constraint
        constraint_weight = max([1, int(np.abs(y).max() + 0.5)])
        print("")
        print("###################################################")
        print(f" Starting FMQA cycles... {constraint_weight=}")
        print("###################################################")

        pred_x = X[0]
        pred_y = 1e18
        self.y = y
        i_sta = 0
        for i in range(self.N - self.N0):
            print(f"FMQA Cycle #{i} ")
            start_time = time.perf_counter()
            try:
                x_hat = self.step(X, y, constraint_weight)
            except RuntimeError:
                sys.exit(f"Unknown error, i = {i}")
            # If an input value identical to the found x_hat already exists in the current training data set, a neighboring value is used as the new x_hat.
            is_identical = True
            while is_identical:
                is_identical = False
                for j in range(i + self.N0):
                    if np.all(x_hat == X[j, :]):
                        # Decode binary vector to integer vector and copy to inputs
                        inputs = self.decoder(x_hat)
                        # Get the surrounding values (inputs is an integer vector)
                        inputs += np.random.randint(-1, 2, len(inputs))
                        for i_inp in range(len(inputs)):
                            if inputs[i_inp] < 0:
                                inputs[i_inp] = 0
                            elif inputs[i_inp] > self.nbins_array[i_inp] - 1:
                                inputs[i_inp] = self.nbins_array[i_inp] - 1
                        # Encode from integer vector to binary vector and copy to x_hat
                        x_hat = self.encoder(inputs)
                        if log:
                            print(f"{i=}, Identical x is found, {x_hat=}")
                        is_identical = True
                        break
            # Evaluate the objective function f() with x_hat
            y_hat = self.true_func(x_hat)
            # Add the input-output pair [x_hat, y_hat] to the training data set
            X = np.vstack((X, x_hat))
            y = np.append(y, y_hat)
            self.y = np.append(self.y, y_hat)
            # Copy the input-output pair to [pred_x, pred_y] when the evaluated value of the objective function updates the minimum value
            print(
                f"FMQA Cycle #{i} finished in {time.perf_counter() - start_time:.2f}s",
                end="",
            )
            if pred_y > y_hat:
                pred_y = y_hat
                pred_x = x_hat
                print(f", variable updated, {pred_y=:.2f}")
            else:
                print()
            print(f"------------------------")
        return pred_x

    # Member function to perform one FMQA cycle
    def step(self, X, y, constraint_weight) -> np.ndarray:
        # Train FM
        start_time = time.perf_counter()
        model, corrcoef = train(
            X,
            y,
            model_class=TorchFM,
            model_params={"d": self.D, "k": self.k},
            batch_size=8,
            epochs=1000,
            criterion=nn.MSELoss(),
            optimizer_class=torch.optim.AdamW,
            # Set scheduler to reduce learning rate with number of epochs
            opt_params={"lr": 0.5},
            lr_sche_class=torch.optim.lr_scheduler.StepLR,
            lr_sche_params={"step_size": 50, "gamma": 0.9},
        )
        print(
            f"FM Training: {time.perf_counter() - start_time:.2f}s (corr:{corrcoef:.2f})"
        )
        start_time = time.perf_counter()
        # Extract FM parameters from the trained FM model
        v, w, w0 = list(model.parameters())
        v = v.detach().numpy()
        w = w.detach().numpy()[0]
        w0 = w0.detach().numpy()[0]
        gen = VariableGenerator()  # Instantiate Variable Generator
        q = gen.array("Binary", self.D)  # Generate a binary variable array
        cost = self.__FM_as_QUBO(
            q, w0, w, v
        )  # Define FM as a QUBO equation from FM parameters
        # Construct one-hot constraints for non-binary variables
        constraint_list = []
        ista = 0
        iend = 0
        for i in range(len(self.nbins_array)):
            iend += self.nbins_array[i]
            constraint_list.append(one_hot(q[ista:iend]))
            ista = iend
        constraints = sum(constraint_list)
        # Add up the objective function and constraints, and pass it to the Fixstars Amplify solver
        model = cost + constraint_weight * constraints
        result = solve(model, self.client)
        print(
            f"QUBO: {time.perf_counter() - start_time:.2f}s (AE execution: {result.execution_time.seconds:.2f}s)"
        )
        if len(result.solutions) == 0:
            raise RuntimeError("No solution was found.")
        q_values = q.evaluate(result.best.values)
        return q_values

    # A function that defines FM as a QUBO equation from FM parameters. As with the previously defined TorchFM class, the formula is written as per the acquisition function form of g(x).
    def __FM_as_QUBO(self, x, w0, w, v):
        lin = w0 + (x.T @ w)
        out_1 = np.array([(x * v[:, i]).sum() ** 2 for i in range(self.k)]).sum()
        # Note that x[j] = x[j]^2 since x[j] is a binary variable in the following equation.
        out_2 = np.array([(x * v[:, i] * v[:, i]).sum() for i in range(self.k)]).sum()
        return lin + (out_1 - out_2) / 2

    # A function to plot the history of i-th objective function evaluations performed within the initial training data construction (blue) and during FMQA cycles (red).
    def plot_history(self):
        assert self.y is not None
        fig = plt.figure(figsize=(6, 4))
        plt.plot(
            [i for i in range(self.N0)],
            self.y[: self.N0],
            marker="o",
            linestyle="-",
            color="b",
        )  # Objective function evaluation values at the time of initial training data generation (random process)
        plt.plot(
            [i for i in range(self.N0, self.N)],
            self.y[self.N0 :],
            marker="o",
            linestyle="-",
            color="r",
        )  # Objective function evaluation values during the FMQA cycles (FMQA cycle process)
        plt.plot(
            [i for i in range(self.N)],
            [self.y[:i].min() for i in range(1, self.N + 1)],
            linestyle="-",
            color="k",
        )  # Update history of minimum objective function values
        plt.xlabel("i-th evaluation of f(x)", fontsize=18)
        plt.ylabel("f(x)", fontsize=18)
        plt.tick_params(labelsize=18)
        return fig

2.6. One-hot encoder/decoder implementationΒΆ

As introduced in section 1.3, an integer index is required as input when using the parameter generation class for signal control SignalController. On the other hand, the input to QUBO must be a binary variable, so encoding from integer indices to binary variables is required. In this tutorial, we will use a one-hot encoding. For example, in one-hot encoding with a 4-bit representation of an integer that can take values from 1 to 4,

  • 1 is encoded to [1, 0, 0, 0]
  • 2 is encoded to [0, 1, 0, 0]
  • 3 is encoded to [0, 0, 1, 0]
  • 4 is encoded to [0, 0, 0, 1]

The following EncDecOneHot class defines functions for converting integer index to binary variable and vice versa (encoder(), decoder()) and functions for generating an integer index input vector with random numbers and converting it to a binary variable matrix gen_random_input().

InΒ [Β ]:
class EncDecOneHot:
    def __init__(self, D: int, nbins_array):
        self.D = D
        self.nbins_array = nbins_array

    def gen_random_input(self):
        ista = 0
        iend = 0
        x = np.zeros(self.D, int)
        for i in range(len(self.nbins_array)):
            iend += self.nbins_array[i]
            idx = np.random.randint(ista, iend, 1)[0]
            x[idx] = 1
            ista = iend
        return x

    def encoder(self, inputs):
        if len(inputs) != len(self.nbins_array):
            raise RuntimeError("inputs should be the same length as nbins_array!")
        x = np.zeros(self.D, int)
        i_offset = 0
        for i in range(len(inputs)):
            x[inputs[i] + i_offset] = 1
            i_offset += self.nbins_array[i]
        return x

    def decoder(self, x):
        bit_locs = np.where(x == 1)[0]
        if len(bit_locs) != len(self.nbins_array):
            raise RuntimeError("bit_locs should be the same length as nbins_array!")
        i_offset = 0
        for i in range(1, len(bit_locs)):
            i_offset += self.nbins_array[i - 1]
            bit_locs[i] -= i_offset
        return bit_locs

3. Optimization of traffic light controlΒΆ

3.1. FMQA execution exampleΒΆ

Now, we use the black-box optimization class FMQA implemented in Section 2 to optimize the control of the traffic light network installed in the target city. Here, the objective function is the negative value of the average speed of all vehicles predicted by the MAS traffic simulator (Section 1). Since the optimization proceeds to minimize the objective function value, we expect that the optimized traffic light control will maximize the average vehicle speed of all vehicles. We can obtain the average vehicle speed in the simulation after the simulation with get_latest_mean(). For each objective function evaluation during the optimization process, n_cases simulation cases with randomly determined initial conditions (departure and destination locations) are considered, and the objective function returns the average value as the objective function value.

In the following code, due to the runtime limit in the demo/tutorial environment, the number of times that we can evaluate the objective function $N$ is 3, of which $N_0$ is 2 for generating the initial data (thus, a total of $N-N_0=1$ optimization cycle).

Although the limited number of optimization cycles ($N - N_0 = 1$) may not always find a better solution than the initial data corresponding to a random search, you can see how the entire optimization progresses.

To perform optimization properly considering a sufficient number of cycles, please download the sample code and run optimization in a local environment after increasing the number of evaluations of the objective function $N$ (our demo/tutorial environment limits the runtime up to 20 minutes). For example, for $N=100$, you can achieve an average vehicle speed of about 8 m/s at the end of the optimization process. In this case, it takes about 2-3 hours for the entire optimization cycle to complete.

An example output for $N_0=100$ and a simulation animation comparing traffic simulation results with/without optimal traffic light control are shown in "3.4. Example of Execution with this Sample Code".

InΒ [Β ]:
import time

# Initialize random seed value
seed_everything()

# Specify the size of the grid city, a [city_size*city_size] lattice map
city_size = 4

# Instantiate the class to generate signal control parameters for a given city size
signal_controller = SignalController(city_size)

# 1D array of bin sizes in one-hot encoding for each variable
nbins_array = np.array(
    [
        [
            signal_controller.nbins_red_duration,
            signal_controller.nbins_green_duration,
            signal_controller.nbins_phase,
        ]
        for i in range(city_size**2)
    ]
).reshape(-1)

# Overall size of binary variables (size of binary decision variables handled by QUBO)
D = nbins_array.sum()
print(f"Number of binary decision variables: {D}")

# Instantiate One-hot encoder-decoder class (EncDecOneHot)
enc_dec_one_hot = EncDecOneHot(D, nbins_array)

# Counter for the number of times the objective function is evaluated
n_eval = 0

num_cars = 100
ratio_mall = 0.5


# Objective function (obtain traffic light control parameters --> run traffic simulation --> calculate objective function value)
def obj_func(x):
    global n_eval
    # Convert from the solution (binary) to integer indices representing the traffic light control parameters (red period, green period, phase)
    index_list = enc_dec_one_hot.decoder(x)

    # In addition, the signal_controller.decode method is used to convert from integer indexes to signal control parameters (real numbers)
    signal_info = signal_controller.decode(index_list)

    # Run simulations with different initialization for n_cases times with the same signal control
    n_cases = 5
    costs = []
    print("Cost convergence: ", end="")
    start_time = time.perf_counter()
    for i in range(n_cases):
        # Instantiate TrafficSimulator2Malls (seed value to determine initial conditions is a random number)
        traffic_sim = TrafficSimulator2Malls(
            signal_info=signal_info,
            num_cars=num_cars,
            ratio_mall=ratio_mall,
            seed=np.random.randint(0, 100),
        )
        # Run the simulation
        traffic_sim.run(steps=20000)
        # Get the negative value of the average speed of all cars in each simulation and append to costs
        costs.append(-traffic_sim.get_latest_mean())
        print(f"{sum(costs) / (i + 1):.2f} ", end="")

    # The average output value of the "n_cases" simulations is the final objective function value.
    cost = sum(costs) / n_cases
    print(
        f"\nEvaluation #{n_eval} finished in {time.perf_counter() - start_time:.2f}s. cost:{cost:.2f}"
    )
    n_eval += 1

    return cost


# N and N0 should be generally increased for meaningful optimization. See the description in Sec. 3.1 above.
N = 3  # Number of times the objective function can be evaluated
N0 = 2  # Number of samples in initial training data
k = 20  # Dimensions of vectors in FM (hyperparameters)

# Start time for optimization
start_time = time.perf_counter()

# Generate initial training data
X, y = gen_training_data_flow_onehot(D, N0, obj_func, enc_dec_one_hot.gen_random_input)

# Instantiate the FMQA class
fmqa_solver = FMQA_NB(
    D=D,
    N=N,
    N0=N0,
    k=k,
    true_func=obj_func,
    client=client,
    encoder_decoder=enc_dec_one_hot,
)

# Execute FMQA cycles
pred_x = fmqa_solver.cycle(X, y)

# Output optimization results
print("###################################################")
print(" Optimal input values and corresponding output")
print("###################################################")
print(f"pred x: {pred_x}")
print(f"pred value: {obj_func(pred_x):.2f}")
print(f"Elapsed time: {time.perf_counter() - start_time:.2f}s")

3.2. Transition of objective function values during the optimization processΒΆ

Below are the $N_0$ target function values obtained for randomly generated input values during initial training data generation and the evolution of target function values during the FMQA optimization process for $N-N_0$ cycles. They are shown in blue and red, respectively. We explain an example output in "3.4. Example of Execution with this Sample Code". When you consider a large enough $N$, the plot shows how the minimum value of the objective function is successively updated by the input value $\hat{x}$ obtained by the FMQA optimization cycle.

InΒ [Β ]:
fig = fmqa_solver.plot_history()

3.3. MAS traffic simulation with optimized traffic light settingsΒΆ

We rerun the traffic simulation with the optimal control parameters obtained from the black-box optimization. An example of the simulation performed is shown in the animation in "3.3. FMQA Execution Example with this Sample Code" for an example of the simulation performed.

InΒ [Β ]:
# Same condition as the simulation during optimization
city_size = 4
num_cars = 100
ratio_mall = 0.5
signal_controller = SignalController(city_size)

# Construct integer indices from optimization result pred_x
index_list = enc_dec_one_hot.decoder(pred_x)
print(f"{index_list=}")

# Convert integer indices to signal control parameters (real numbers)
signal_info = signal_controller.decode(index_list)

# Instantiate simulator
traffic_sim = TrafficSimulator2Malls(
    signal_info=signal_info, num_cars=num_cars, ratio_mall=ratio_mall
)

# Run the traffic simulation with the optimized signal control
traffic_sim.run(2000, animation=True, interval=100)

3.4. Example output from this FMQA sample programΒΆ

In general, the solutions obtained are not entirely reproducible due to the principle of the heuristics algorithm used in FixstarsClient. Still, a typical standard output and image output obtained when executing this example code with $N=100, N_0=10$ are shown below. Note that the obtained values may differ.

  • If you execute the code described in "3.1. FMQA execution example" with "$N=100, N_0=10$", the following standard output will appear sequentially.

    1. First, the results (cost) of the traffic simulation for the traffic lights controlled by random numbers generated during the initial training data generation are sequentially output ($N_0$ times).

      Number of binary decision variables: 960
      
        ###################################################
        Making initial training data
        ###################################################
        Generating training data set #0.
        Cost convergence: -2.40 -3.41 -3.32 -3.53 -3.95 
        Evaluation #0 finished in 52.88s. cost:-3.95
        ------------------------
        Generating training data set #1.
        Cost convergence: -6.56 -6.06 -6.13 -6.00 -5.50 
        Evaluation #1 finished in 50.06s. cost:-5.50
        ------------------------
        Generating training data set #2.
        Cost convergence: -6.19 -6.58 -6.59 -6.56 -6.49 
        Evaluation #2 finished in 47.57s. cost:-6.49
        ------------------------
        Generating training data set #3.
        Cost convergence: -4.18 -4.01 -4.07 -4.10 -4.13 
        Evaluation #3 finished in 53.65s. cost:-4.13
        ------------------------
        Generating training data set #4.
        Cost convergence: -3.79 -4.32 -4.19 -3.73 -3.90 
        Evaluation #4 finished in 55.38s. cost:-3.90
        ------------------------
        Generating training data set #5.
        Cost convergence: -1.68 -3.16 -3.20 -3.22 -3.06 
        Evaluation #5 finished in 56.32s. cost:-3.06
        ------------------------
        Generating training data set #6.
        Cost convergence: -6.94 -5.61 -5.75 -5.71 -5.90 
        Evaluation #6 finished in 50.48s. cost:-5.90
        ------------------------
        Generating training data set #7.
        Cost convergence: -5.38 -6.08 -5.45 -5.38 -5.43 
        Evaluation #7 finished in 51.79s. cost:-5.43
        ------------------------
        Generating training data set #8.
        Cost convergence: -3.43 -3.91 -4.12 -4.55 -4.85 
        Evaluation #8 finished in 50.39s. cost:-4.85
        ------------------------
        Generating training data set #9.
        Cost convergence: -3.77 -4.18 -4.31 -4.37 -4.08 
        Evaluation #9 finished in 54.33s. cost:-4.08
        ------------------------
      
    2. After completing the initial training data construction, an $N-N_0$ optimization cycle begins. The following output is sequentially displayed during the cycle for each optimization attempt.

      ###################################################
        Starting FMQA cycles... constraint_weight=6
        ###################################################
        FMQA Cycle #0 
        FM Training: 1.55s (corr:0.61)
        QUBO: 11.52s (AE execution: 5.05s)
        Cost convergence: -3.95 -3.91 -3.59 -3.63 -3.58 
        Evaluation #10 finished in 51.21s. cost:-3.58
        FMQA Cycle #0 finished in 64.31s, variable updated, pred_y=-3.58
        ------------------------
        FMQA Cycle #1 
        FM Training: 1.62s (corr:0.04)
        QUBO: 10.90s (AE execution: 5.00s)
        Cost convergence: -3.06 -3.52 -3.99 -4.14 -4.40 
        Evaluation #11 finished in 52.14s. cost:-4.40
        FMQA Cycle #1 finished in 64.69s, variable updated, pred_y=-4.40
        ------------------------
        FMQA Cycle #2 
        FM Training: 2.53s (corr:0.73)
        QUBO: 10.34s (AE execution: 4.92s)
        Cost convergence: -6.94 -5.87 -5.73 -5.74 -6.05 
        Evaluation #12 finished in 48.41s. cost:-6.05
        FMQA Cycle #2 finished in 61.32s, variable updated, pred_y=-6.05
        ------------------------
        FMQA Cycle #3 
        FM Training: 3.15s (corr:0.85)
        QUBO: 10.34s (AE execution: 4.93s)
        Cost convergence: -4.43 -5.11 -5.31 -5.06 -5.11 
        Evaluation #13 finished in 45.11s. cost:-5.11
        FMQA Cycle #3 finished in 58.63s
        ------------------------
        ・
        ・
        ・
        ------------------------
        FMQA Cycle #34 
        FM Training: 5.82s (corr:0.95)
        QUBO: 10.98s (AE execution: 4.97s)
        Cost convergence: -7.84 -8.14 -7.96 -7.85 -7.89 
        Evaluation #44 finished in 48.05s. cost:-7.89
        FMQA Cycle #34 finished in 64.88s
        ------------------------
        FMQA Cycle #35 
        FM Training: 9.23s (corr:0.95)
        QUBO: 14.44s (AE execution: 4.94s)
        Cost convergence: -7.47 -7.20 -6.80 -6.60 -6.76 
        Evaluation #45 finished in 59.24s. cost:-6.76
        FMQA Cycle #35 finished in 82.94s
        ------------------------
        FMQA Cycle #36 
        FM Training: 12.03s (corr:0.95)
        QUBO: 14.06s (AE execution: 4.99s)
        Cost convergence: -7.80 -8.14 -8.10 -8.20 -8.21 
        Evaluation #46 finished in 60.88s. cost:-8.21
        FMQA Cycle #36 finished in 87.03s, variable updated, pred_y=-8.21
        ------------------------
        ・
        ・
        ・
        ------------------------
        FMQA Cycle #88 
        FM Training: 8.81s (corr:0.92)
        QUBO: 10.11s (AE execution: 4.95s)
        Cost convergence: -5.62 -5.64 -5.83 -5.90 -5.96 
        Evaluation #98 finished in 45.61s. cost:-5.96
        FMQA Cycle #88 finished in 64.57s
        ------------------------
        FMQA Cycle #89 
        FM Training: 8.72s (corr:0.94)
        QUBO: 11.70s (AE execution: 4.99s)
        Cost convergence: -5.34 -5.79 -6.10 -5.93 -6.14 
        Evaluation #99 finished in 44.31s. cost:-6.14
        FMQA Cycle #89 finished in 64.76s
        ------------------------
        ###################################################
        Optimal input values and corresponding output
        ###################################################
        pred x: [0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.
        1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.
        2. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        3. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.
        4. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0.
        5. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.
        6. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        7. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        8. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.
        9. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        10. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        11. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        12. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0.
        13. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        14. 0. 1. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        15. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        16. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.
        17. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        18. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        19. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.
        20. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.
        21. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        22. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0.
        23. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.
        24. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        25. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.
        26. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        27. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        28. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        29. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        30. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.
        31. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.
        32. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        33. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
        34. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.
        35. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.
        36. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0.
        37. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.
        38. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.
        39. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
        Cost convergence: -7.43 -7.56 -7.83 -7.85 -7.91 
        Evaluation #100 finished in 45.37s. cost:-7.91
        pred value: -7.91
        Elapsed time: 6319.51s
      

    • The output image of fmqa_reactor.plot_history() described in "3.2. Transition of objective function values" is as follows, showing how the minimum value of the objective function is updated one after another during the black-box optimization process. For example, for a $N=100$ condition, the average vehicle speed with the traffic light control pattern obtained in the black-box optimization is about 25% higher than the average speed with the best traffic light control obtained in the random search process (initial training data construction shown in blue).

      BBO_history

    • "3.3. MAS traffic simulation with optimized traffic light settings" displays an animation of the traffic simulation results when we use the optimized traffic signal control. The traffic simulation results with traffic light control based on (1) the final optimized results obtained in the optimization transition described above are shown below, along with (2) the simulation results based on the best traffic light control in the random search process. We consider the same initial conditions in both simulations.

      Unlike the case of (1) traffic light control based on the final optimization result, (2) traffic light control based on the best result in the random search process yields several phenomena where relatively long traffic jams occur at intersections around shopping malls and, vehicles are prevented from entering to the intersection from intersections in the vicinity.

      • (1) Signal control based on final optimization results (average vehicle speed for all vehicles: 8.55 m/s)
        BBO_sol1

      • (2) Traffic light control based on the best result in the random search process (control parameter corresponding to the evaluation indicated by the third blue circle in the transition diagram of the objective function above, average vehicle speed for all vehicles: 6.88 m/s)
        BBO_sol2

4. Summary and practical guidelinesΒΆ

In this example program, we performed optimization based on objective function values obtained from MAS traffic simulations. In actual signal control optimization, it may be more realistic to perform optimization based on daily traffic condition measurements rather than based on simulations. A DevOps-like operation method that performed optimization while operating the system can be considered in this case. In other words:

  1. Change the traffic light control appropriately and acquire $N_0$ days of data (the set of control parameters and objective function values).
    (Or, use $N_0$ days of data measured in the past)

  2. Perform one cycle of black-box optimization based on $N_0$ data sets to obtain new (hopefully better) control parameters.

  3. Based on the new control parameters, control the traffic lights the next day and measure the objective function values. We add the measurement results (control parameters and objective function values) to the data set.

  4. Perform one cycle of black-box optimization based on the added data set and obtain new (hopefully even better) control parameters.

  5. Repeat steps 3 and 4 above.

Here, the objective function value does not necessarily have to be the negative value of the average vehicle speed used in this sample program --- a more easily measured value, such as the negative value of the number of vehicles entering certain intersections per unit time, can be used to perform similar optimization.

Based on such an optimization flow, as shown in "3.4. Example output from this FMQA sample program", the traffic situation is expected to improve day by day in an average sense. However, there are days when the optimization suggests control parameters with poor performance. This mechanism prevents the optimization from falling into a local optimum solution and is an unavoidable event in adopting this black-box optimization method. However, after a sufficient number of optimization cycles, we expect that the likelihood of obtaining such poorly performing control parameters will decrease. The following optimization history performed under $N=400$ conditions suggests the same.

BBO_history

Lastly, a similar traffic light control is considered a combinatorial optimization problem based on the QUBO formulation and is presented here.